任务队列:NestJS 中的队列方案 @nestjs/bull
定时任务适合处理周期性的轻量级操作,但对于高并发、秒级精度或需要重试机制的任务场景,需要引入消息队列方案。NestJS 官方推荐使用 @nestjs/bull 模块,它基于 Bull 库,以 Redis 作为消息存储后端,封装了 Producer(生产者)、Consumer(消费者)和 Listener(监听者)三大角色,提供简洁的 API 接口。
定时任务 vs 任务队列
| 特性 | @nestjs/schedule | @nestjs/bull |
|---|---|---|
| 触发方式 | 时间驱动(Cron 表达式) | 事件驱动(添加 Job) |
| 精度 | 分钟级 | 毫秒级(delay) |
| 并发 | 受限于主进程内存 | 独立 Worker,支持并发控制 |
| 重试 | 需自行实现 | 内置 attempts + backoff |
| 持久化 | 无 | Redis 存储 |
| 适用场景 | 周期性轻量任务 | 高并发、延迟执行、需重试的任务 |
安装依赖
npm install @nestjs/bull bull
# 或指定版本
npm install @nestjs/bull@10 bull@4.13.0
bash
版本说明:
@nestjs/bullv10 对应bullv4.x。如果安装时大版本号不一致,请锁定版本以保持兼容。
Redis 服务准备
Bull 依赖 Redis 作为存储后端,可以通过 Docker 快速启动:
docker run -d --name redis -p 6379:6379 redis:7-alpine
bash
在 .env 文件中配置 Redis 连接信息和队列开关:
# Queue configuration
QUEUE_ON=false
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
env
创建队列模块
使用 NestJS CLI 创建队列模块:
npx nest g module common/conditional/q --no-spec
bash
全局注册 -- BullModule.forRoot
forRoot 方法注册全局的 Redis 连接配置,所有队列共享:
// q.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
password: 'your_password',
},
}),
],
})
export class QModule {}
typescript
注册单个队列 -- BullModule.registerQueue
每个队列通过 registerQueue 方法独立注册,支持覆盖全局配置:
BullModule.registerQueue({
name: 'test_queue',
})
typescript
核心 API 与角色
Bull 队列模型包含三个核心角色:
Producer (生产者) Queue (队列) Consumer (消费者)
| | |
| queue.add(job) | Redis Storage | @Processor
|----------------------->| | @Process
| |----------------------->|
| |
| Listener (监听者)
| @OnQueueCompleted
| @OnQueueFailed
text
Producer -- 添加任务
在 Controller 或 Service 中注入队列实例,调用 add 方法添加任务:
import { Controller, Get } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
@Controller()
export class AppController {
constructor(
@InjectQueue('test_queue') private queue: Queue,
) {}
@Get('test-queue')
async testQueue() {
await this.queue.add({ foo: 'bar', count: 123 });
return { status: 'ok' };
}
}
typescript
Consumer -- 消费任务
使用 @Processor 装饰器标识消费者类,@Process 装饰器标识处理方法:
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('test_queue')
export class TestConsumer {
@Process()
async handleJob(job: Job<{ foo: string; count: number }>) {
console.log('Processing job:', job.id);
console.log('Job data:', job.data);
// Execute business logic
}
}
typescript
Consumer 必须注册到对应模块的 providers 中:
@Module({
providers: [TestConsumer],
})
export class QModule {}
typescript
Job Options 详解
添加任务时可以通过第三个参数配置丰富的选项:
await queue.add('send-email', { to: 'user@example.com' }, {
priority: 1, // 1 (highest) - MAX_INT (lowest)
delay: 5000, // Delay 5 seconds before processing (ms)
attempts: 3, // Retry up to 3 times on failure
backoff: {
type: 'exponential', // 'exponential' or 'fixed'
delay: 2000, // Initial backoff delay (ms)
},
lifo: false, // true = Last In First Out
timeout: 60000, // Job timeout (ms)
jobId: 'unique-id', // Custom job ID
removeOnComplete: true, // Auto-remove on success
removeOnFail: false, // Keep failed jobs for debugging
stackTraceLimit: 10, // Limit stack trace depth
});
typescript
常用选项说明:
| 选项 | 默认值 | 说明 |
|---|---|---|
priority | 0 | 优先级,数值越小优先级越高 |
delay | 0 | 延迟执行时间(毫秒) |
attempts | 0 | 失败重试次数 |
backoff | - | 退避策略,支持指数退避和固定间隔 |
lifo | false | 设为 true 后队列变为后进先出 |
removeOnComplete | false | 完成后自动删除任务记录 |
removeOnFail | false | 失败后自动删除任务记录 |
Redis 连接模式
Bull 基于 ioredis,支持以下连接模式:
- 单节点模式 -- 最简单,适合开发和测试
- 哨兵模式(Sentinel) -- 高可用,自动故障转移
- 集群模式(Cluster) -- 分布式,需自定义
connect函数
// Sentinel mode
BullModule.forRoot({
redis: {
sentinels: [
{ host: 'sentinel1', port: 26379 },
{ host: 'sentinel2', port: 26379 },
],
name: 'mymaster',
},
});
typescript
Job 实例方法
在 Consumer 中,job 对象提供了丰富的操作方法:
@Process()
async handleJob(job: Job) {
await job.updateProgress(50); // Update progress (0-100)
await job.update({ status: 'processing' }); // Update data
const data = job.data; // Get job data
const id = job.id; // Get job ID
await job.remove(); // Remove job from queue
const state = await job.getState(); // 'completed' | 'failed' | 'delayed' | etc.
}
typescript
测试验证
- 启动 Redis 服务和 NestJS 调试进程
- 向
GET /test-queue发送请求,添加任务到队列 - 在控制台查看 Consumer 的输出日志,确认任务已被消费
- 任务处理完成后控制台输出停止
本节总结
- 了解了定时任务的局限性,引入
@nestjs/bull作为任务队列方案 - Bull 基于 Redis 存储队列数据,支持 Producer-Consumer-Listener 模式
- 掌握了
BullModule.forRoot()全局配置和registerQueue()队列注册 - 使用
@InjectQueue注入队列实例添加任务,@Processor+@Process消费任务 - Job Options 提供了优先级、延迟、重试、退避等丰富的任务控制能力
↑